去年10月,我写了一篇利用python搭建TG消息推送的文章,当时买了一台几十块钱一年的小鸡来挂这个脚本,虽然隔三差五机器嗝屁,但还是持续使用到了今年8月直到服务器到期,到期后一时没有合适机器来放脚本,于是又花几十块续费了一年,但是今年极其不稳定,每天都嗝屁,甚至有时候嗝好几天,截止本文发布时依旧处于嗝屁状态已连续一周。

今天在抖音刷到视频有人在GitHub放脚本,我突然冒出把推送脚本也放到GitHub的想法,在和豆包简单了解这个想法可行之后,马上开始了实践。

准备工作

联系 Telegram 官方机器人 @BotFather,发送/newbot,按提示设置名称和用户名,最终会获得一个Bot Token(格式:123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11),保存备用。

将你的 Bot 加入群组,然后在群组内发送任意消息,再向 @getidsbot 发送该消息的转发,会返回群组 ID(格式:-1001234567890),保存备用。

开始搭建

新建一个 GitHub 仓库(示例命名rss-telegram-pusher),设置公开状态。

    import feedparser
    import logging
    import asyncio
    import json
    import os
    from telegram import Bot
    from telegram.error import TelegramError
    
    # 从环境变量读取配置
    TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
    CHAT_ID = os.getenv("CHAT_ID")
    RSS_URL = os.getenv("RSS_URL")
    
    # 存储已发送ID的本地文件
    POSTS_FILE = "sent_posts.json"
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )
    
    # 读取已发送的post_id
    def load_sent_posts():
        try:
            if os.path.exists(POSTS_FILE):
                with open(POSTS_FILE, "r", encoding="utf-8") as f:
                    content = f.read().strip()
                    return json.loads(content) if content else []
            logging.info("首次运行,创建空ID列表")
            return []
        except Exception as e:
            logging.error(f"读取已发送ID失败:{str(e)}")
            return []
    
    # 保存已发送的post_id
    def save_sent_posts(post_ids):
        try:
            with open(POSTS_FILE, "w", encoding="utf-8") as f:
                json.dump(post_ids, f, ensure_ascii=False, indent=2)
            logging.info(f"已保存ID列表(共{len(post_ids)}条):{post_ids}")
        except Exception as e:
            logging.error(f"保存已发送ID失败:{str(e)}")
    
    # 获取RSS更新
    def fetch_updates():
        try:
            logging.info(f"获取RSS源:{RSS_URL}")
            feed = feedparser.parse(RSS_URL)
            if feed.bozo:
                logging.error(f"RSS解析错误:{feed.bozo_exception}")
                return None
            logging.info(f"成功获取{len(feed.entries)}条RSS条目")
            return feed
        except Exception as e:
            logging.error(f"获取RSS失败:{str(e)}")
            return None
    
    # 转义Markdown特殊字符
    def escape_markdown(text):
        special_chars = r"_*~`>#+-.!()"
        for char in special_chars:
            text = text.replace(char, f"\{char}")
        return text
    
    # 发送单条消息到Telegram(带间隔)
    async def send_message(bot, title, link, delay=3):
        try:
            # 发送前等待指定秒数(避免频率限制)
            await asyncio.sleep(delay)
            escaped_title = escape_markdown(title)
            escaped_link = escape_markdown(link)
            message = f"`{escaped_title}`\n{escaped_link}"
            logging.info(f"发送消息:{message[:100]}")
            await bot.send_message(
                chat_id=CHAT_ID,
                text=message,
                parse_mode="MarkdownV2"
            )
            logging.info("消息发送成功")
            return True
        except TelegramError as e:
            logging.error(f"Telegram发送失败:{str(e)}")
            return False
    
    # 检查更新并推送所有新帖子
    async def check_for_updates(sent_post_ids):
        updates = fetch_updates()
        if not updates:
            return
    
        new_posts = []
        for entry in updates.entries:
            try:
                # 提取帖子ID(适配URL格式)
                guid_parts = entry.guid.split("-")
                if len(guid_parts) < 2:
                    logging.warning(f"无效GUID格式:{entry.guid},跳过")
                    continue
                post_id = guid_parts[-1].split(".")[0]
                if not post_id.isdigit():
                    logging.warning(f"提取的ID非数字:{post_id},跳过")
                    continue
                logging.info(f"解析到有效ID:{post_id},标题:{entry.title[:20]}...")
                if post_id not in sent_post_ids:
                    new_posts.append((post_id, entry.title, entry.link))
            except Exception as e:
                logging.error(f"解析条目失败(GUID:{entry.guid}):{str(e)}")
                continue
    
        if new_posts:
            # 按ID升序排序(从旧到新推送),若想从新到旧则用reverse=True
            new_posts.sort(key=lambda x: int(x[0]))  # 从小到大:旧→新
            # new_posts.sort(key=lambda x: int(x[0]), reverse=True)  # 从大到小:新→旧
    
            logging.info(f"发现{len(new_posts)}条新帖子,准备依次推送(间隔3秒)")
            async with Bot(token=TELEGRAM_TOKEN) as bot:
                # 逐条推送,每条间隔3秒
                for i, (post_id, title, link) in enumerate(new_posts):
                    # 第一条消息延迟0秒,后续每条延迟3秒
                    success = await send_message(bot, title, link, delay=3 if i > 0 else 0)
                    if success:
                        sent_post_ids.append(post_id)  # 仅成功推送的ID才记录
    
            # 保存所有成功推送的ID
            save_sent_posts(sent_post_ids)
        else:
            logging.info("无新帖子需要推送")
    
    # 主函数
    async def main():
        logging.info("===== 脚本开始运行 =====")
        sent_post_ids = load_sent_posts()
        try:
            await check_for_updates(sent_post_ids)
        except Exception as e:
            logging.error(f"主逻辑执行失败:{str(e)}")
        logging.info("===== 脚本运行结束 =====")
    
    if __name__ == "__main__":
        asyncio.run(main())
        name: RSS to Telegram
    on:
      schedule:
        - cron: "*/2 * * * *"  # 每2分钟运行一次
      workflow_dispatch:  # 允许手动触发
    
    jobs:
      run:
        runs-on: ubuntu-latest
        steps:
          # 拉取仓库代码(带令牌认证,后续步骤自动继承权限)
          - name: 拉取仓库代码
            uses: actions/checkout@v4
            with:
              token: ${{ secrets.MY_GITHUB_TOKEN }}  # 用令牌拉取,确保推送权限
              fetch-depth: 0  # 获取完整历史,避免分支冲突
    
          # 设置Python环境
          - name: 设置Python环境
            uses: actions/setup-python@v5
            with:
              python-version: "3.10"
    
          # 安装依赖库
          - name: 安装依赖
            run: pip install python-telegram-bot feedparser
    
          # 运行脚本(检测新内容并推送)
          - name: 运行脚本
            env:
              TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
              CHAT_ID: ${{ secrets.CHAT_ID }}
              RSS_URL: ${{ secrets.RSS_URL }}
            run: python rss_pusher.py
    
          # 提交更新后的ID文件(无repo_token,消除警告)
          - name: 提交更新后的ID文件
            uses: stefanzweifel/git-auto-commit-action@v4
            with:
              commit_message: "更新已发送的帖子ID"
              file_pattern: "sent_posts.json"  # 仅提交ID文件
              branch: main  # 仓库主分支(确保与你的分支名称一致)
              commit_user_name: "GitHub Actions"
              commit_user_email: "[email protected]"
[] # 留空即可

创建地址,创建个人访问令牌(经典),填写自定义令牌名称,有效期建议90天,令牌范围必须勾选repo(写入仓库),完成创建后只会显示一次令牌token,复制保存。

设置仓库密钥

步骤: Settings(设置) → Secrets and variables(和变量) → Actions(行动) → Repository → secrets(存储库机密) → New repository secret(创建存储库机密)

添加以下密钥:

TELEGRAM_TOKEN:你的 Telegram Bot Token
CHAT_ID:目标群组 ID
RSS_URL:你的 RSS 源地址
MY_GITHUB_TOKEN:GitHub 令牌token

添加完成后如图:
1760968222781.png

测试与验证

进入仓库 → 点击顶部 “Actions” → 左侧选择 “RSS to Telegram” → 点击 “Run workflow” → “Run workflow”,手动触发一次运行,查看群内是否有信息,如果没有,在仓库页点击顶部导航栏的Actions标签,进入工作流运行记录页面,可查看报错原因。


注:此脚本用于本论坛使用,适配本论坛帖子url,其它网站需稍作调整才可正常使用。

仓库地址:https://github.com/kannimade/rss-telegram-pusher

论坛原文:https://www.dalao.net/thread-51611.htm